package com.github.hgkmail.hello.interview.multithread;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

//并发分为2块，一块是互斥（synchronized和AQS），另一块是协作（condition）
//condition本质是一个条件队列，wait是释放锁进入条件队列，等条件满足了且收到notify/notifyAll的通知，则进入锁等待队列
//notify是随机通知一个条件队列的线程，notifyAll是通知条件队列的所有线程（相当于立刻清空条件队列）
//类比：锁等待队列（主键索引 只有一个）、条件队列（普通索引 可以有多个），条件队列的线程最终需要进入锁等待队列去竞争锁
//synchronized只能关联一个条件队列
//AQS可以关联多个条件队列
public class TestCondition {
    public static final int THREAD_NUM=10;
    public int sum=0;

    private Deque<Integer> stack=new LinkedList<>();
    private MyAQS.SyncStack syncStack=new MyAQS.SyncStack();

    private Deque<Integer> queue=new LinkedList<>();

    private synchronized void increase(int a) {
        sum+=a;
    }

    public void stackWithoutSync() throws InterruptedException {
        List<Thread> producers=new ArrayList<>();
        List<Thread> consumer=new ArrayList<>();

        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        stack.offerLast(1); //这里offerLast(1)并不是原子操作，可以分解成好几条字节码指令
                        try {
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            producers.add(thread);
            thread.start();
        }
        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        if (!stack.isEmpty()) {
                            increase(stack.pollLast());
                        }
                        try {
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            consumer.add(thread);
            thread.start();
        }
        for (Thread t:producers) {
            t.join(); //join：阻塞等待线程执行完成
        }
        for (Thread t:consumer) {
            t.join(); //join：阻塞等待线程执行完成
        }
    }

    public void stackWithSync() throws InterruptedException {
        List<Thread> producers=new ArrayList<>();
        List<Thread> consumer=new ArrayList<>();

        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        syncStack.syncOfferLast(1);
                        try {
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            producers.add(thread);
            thread.start();
        }
        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            increase(syncStack.syncPollLast()); //同步stack为空时会阻塞线程，不用判断是否有元素
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            consumer.add(thread);
            thread.start();
        }
        for (Thread t:producers) {
            t.join(); //join：阻塞等待线程执行完成
        }
        for (Thread t:consumer) {
            t.join(); //join：阻塞等待线程执行完成
        }
    }

    public void queueWithoutSync() throws InterruptedException {
        List<Thread> producers=new ArrayList<>();
        List<Thread> consumer=new ArrayList<>();

        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            queue.offerLast(1);
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            producers.add(thread);
            thread.start();
        }
        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            if (!queue.isEmpty()) {
                                increase(queue.pollFirst());
                            }
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                }
            });
            consumer.add(thread);
            thread.start();
        }
        for (Thread t:producers) {
            t.join(); //join：阻塞等待线程执行完成
        }
        for (Thread t:consumer) {
            t.join(); //join：阻塞等待线程执行完成
        }
    }

    private MyAQS.SyncQueue syncQueue=new MyAQS.SyncQueue(10);
    public void queueWithSync() throws InterruptedException {
        List<Thread> producers=new ArrayList<>();
        List<Thread> consumer=new ArrayList<>();
        //线程互等锁
        final CyclicBarrier barrier=new CyclicBarrier(2 * THREAD_NUM + 1, new Runnable() {
            @Override
            public void run() {
                System.out.println("All finish!");
            }
        });

        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            syncQueue.syncOfferLast(1);
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
            producers.add(thread);
            thread.start();
        }
        for (int i = 0; i < THREAD_NUM; i++) {
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            increase(syncQueue.syncPollFirst());
                            Thread.sleep(1);
                        } catch (InterruptedException e) { //sleep过程中调用interrupt()会抛出InterruptedException
                            e.printStackTrace();
                        }
                    }
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.add(thread);
            thread.start();
        }
//        for (Thread t:producers) {
//            t.join(); //join：阻塞等待线程执行完成
//        }
//        for (Thread t:consumer) {
//            t.join(); //join：阻塞等待线程执行完成
//        }
        //用线程互等锁来等所有线程对齐
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        TestCondition test=new TestCondition();
        try {
//            test.stackWithoutSync();
//            test.stackWithSync();
//            test.queueWithoutSync();
            test.queueWithSync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(test.sum);
    }
}
